package com.raylu.utils;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

// 泛型指定了自定义数据源中的数据类型
public class ClickSource implements SourceFunction<ClickEvent> {
    private boolean running = true;
    private Random random = new Random();
    private String[] userArr = {"Mary", "Alice", "Bob", "John", "Liz"};
    private String[] urlArr = {"./home", "./cart", "./fav", "./product?id=1", "./product?id=2"};

    @Override
    public void run(SourceContext<ClickEvent> ctx) throws Exception {
        while (running) {
            // collect方法发出数据
            ctx.collect(
                    new ClickEvent(
                            userArr[random.nextInt(userArr.length)],
                            urlArr[random.nextInt(urlArr.length)],
                            Calendar.getInstance().getTimeInMillis()
                    )
            );
            Thread.sleep(100L);
        }
    }

    // `flink cancel JobId`
    @Override
    public void cancel() {
        running = false;
    }
}
